Platform Explorer / Nuxeo Platform 2023.6

Extension point streamProcessor

Contribution Descriptors

  • Class: org.nuxeo.runtime.stream.StreamProcessorDescriptor

Existing Contributions

Contributions are presented in the same order as the registration order on this extension point. This order is displayed before the contribution name, in brackets.

  • nuxeo-runtime-stream-2023.6.12.jar /OSGI-INF/stream-service.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.runtime.stream.StreamMetricsProcessor" enabled="true" name="metrics">
          <!-- To handle a MSK rolling upgrade we need 30min retries -->
          <policy continueOnFailure="false" delay="15s" maxDelay="120s" maxRetries="18" name="default"/>
          <stream codec="avro" name="input/null" partitions="1"/>
          <computation concurrency="1" name="stream/metrics"/>
        </streamProcessor>
      </extension>
  • nuxeo-coldstorage-2023.2.3.jar /OSGI-INF/coldstorage-bulk-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.coldstorage.action.MoveToColdStorageContentAction" defaultConcurrency="2" defaultPartitions="4" name="moveToColdStorage">
          <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
        <streamProcessor class="org.nuxeo.coldstorage.action.PropagateMoveToColdStorageContentAction" defaultConcurrency="2" defaultPartitions="4" name="propagateMoveToColdStorage">
          <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
        <streamProcessor class="org.nuxeo.coldstorage.action.PropagateRestoreFromColdStorageContentAction" defaultConcurrency="2" defaultPartitions="4" name="propagateRestoreFromColdStorage">
          <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
        <streamProcessor class="org.nuxeo.coldstorage.action.CheckColdStorageAvailabilityAction" defaultConcurrency="2" defaultPartitions="4" defaultScroller="default" name="checkColdStorageAvailability">
          <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-core-2023.6.12.jar /OSGI-INF/retention-and-hold-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.ecm.core.security.RetentionExpiredAction" defaultConcurrency="1" defaultPartitions="1" name="retentionExpired">
          <!-- continue on failure, because failure to expire retention doesn't give us an inconsistent state -->
          <policy continueOnFailure="true" delay="1s" maxDelay="60s" maxRetries="20" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-core-2023.6.12.jar /OSGI-INF/bulk-migration-action-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <!-- Migration processor -->
        <streamProcessor class="org.nuxeo.ecm.core.migrator.AbstractBulkMigrator$MigrationAction" defaultConcurrency="2" defaultPartitions="4" name="migration" start="false">
          <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-core-2023.6.12.jar /OSGI-INF/core-domain-event-producer-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.ecm.core.blob.stream.StreamOrphanBlobGC" defaultCodec="avro" defaultConcurrency="1" defaultPartitions="1" enabled="true" name="blobGC">
          <policy continueOnFailure="true" delay="3s" maxDelay="60s" maxRetries="3" name="default"/>
        </streamProcessor>
        <streamProcessor class="org.nuxeo.ecm.core.model.stream.StreamDocumentGC" defaultCodec="avro" defaultConcurrency="1" defaultPartitions="1" enabled="true" name="documentGC">
          <policy continueOnFailure="true" delay="3s" maxDelay="60s" maxRetries="3" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-core-2023.6.12.jar /OSGI-INF/deletion-action-config.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.ecm.core.action.DeletionAction" defaultConcurrency="2" defaultPartitions="4" name="deletion">
          <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
        <!-- GarbageCollectOrphanBlobs processor -->
        <streamProcessor class="org.nuxeo.ecm.core.action.GarbageCollectOrphanBlobsAction" defaultConcurrency="2" defaultPartitions="4" name="garbageCollectOrphanBlobs">
          <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-core-2023.6.12.jar /OSGI-INF/orphanVersionsCleanup-listener-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.ecm.core.action.GarbageCollectOrphanVersionsAction" defaultConcurrency="2" defaultPartitions="4" name="garbageCollectOrphanVersions">
          <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-drive-core-2023.6.12.jar /OSGI-INF/nuxeodrive-bulk-action-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.drive.action.FireGroupUpdatedEventAction" defaultConcurrency="2" defaultPartitions="4" name="driveFireGroupUpdatedEvent">
          <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-platform-csv-export-2023.6.12.jar /OSGI-INF/csv-export-config.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <!-- CSV exporter processor -->
        <streamProcessor class="org.nuxeo.ecm.platform.csv.export.action.CSVExportAction" defaultConcurrency="2" defaultPartitions="4" name="csvExport">
          <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/>
          <stream name="bulk/makeBlob">
            <filter class="org.nuxeo.ecm.core.transientstore.computation.TransientStoreOverflowRecordFilter" name="overflow">
              <option name="storeName">default</option>
              <option name="prefix">csvoverflow</option>
              <option name="thresholdSize">990000</option>
            </filter>
          </stream>
          <option name="produceImmediate">true</option>
        </streamProcessor>
      </extension>
  • nuxeo-platform-imaging-core-2023.6.12.jar /OSGI-INF/imaging-bulk-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.ecm.platform.picture.recompute.RecomputeViewsAction" defaultConcurrency="2" defaultPartitions="6" name="recomputeViews">
          <policy continueOnFailure="true" delay="5s" maxDelay="10s" maxRetries="1" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-platform-video-2023.6.12.jar /OSGI-INF/video-bulk-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.ecm.platform.video.action.RecomputeVideoConversionsAction" defaultConcurrency="2" defaultPartitions="6" name="recomputeVideoConversions">
          <policy continueOnFailure="true" delay="5s" maxDelay="10s" maxRetries="1" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-retention-2023.3.6.jar /OSGI-INF/retention-actions.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.retention.actions.HoldDocumentsAction" defaultConcurrency="2" defaultPartitions="4" name="holdDocumentsAction">
          <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
        <streamProcessor class="org.nuxeo.retention.actions.UnholdDocumentsAction" defaultConcurrency="2" defaultPartitions="4" name="unholdDocumentsAction">
          <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
        <streamProcessor class="org.nuxeo.retention.actions.AttachRetentionRuleAction" defaultConcurrency="2" defaultPartitions="4" name="attachRetentionRule">
          <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
        <streamProcessor class="org.nuxeo.retention.actions.EvalInputEventBasedRuleAction" defaultConcurrency="2" defaultPartitions="4" name="evalInputEventBasedRule">
          <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
        <streamProcessor class="org.nuxeo.retention.actions.ProcessRetentionEventAction" defaultConcurrency="2" defaultPartitions="4" name="processRetentionEvent">
          <policy continueOnFailure="true" delay="1s" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-routing-core-2023.6.12.jar /OSGI-INF/document-routing-escalation-scheduler-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.ecm.platform.routing.core.bulk.DocumentRoutingEscalationAction" defaultConcurrency="2" defaultPartitions="4" name="DocumentRoutingEscalationAction">
          <policy continueOnFailure="true" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-routing-core-2023.6.12.jar /OSGI-INF/document-routing-cleanup-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <!-- GarbageCollectOrphanRoute processor -->
        <streamProcessor class="org.nuxeo.ecm.platform.routing.core.bulk.GarbageCollectRoutesAction" defaultConcurrency="2" defaultPartitions="4" name="garbageCollectWokflows">
          <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-platform-audit-core-2023.6.12.jar /OSGI-INF/nxaudit-service.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.ecm.platform.audit.impl.StreamAuditWriter" defaultCodec="avro" defaultConcurrency="1" defaultPartitions="1" enabled="true" name="auditWriter">
          <policy batchCapacity="25" batchThreshold="500ms" continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="20" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-core-bulk-2023.6.12.jar /OSGI-INF/bulk-config.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.ecm.core.bulk.BulkServiceProcessor" defaultCodec="avro" defaultConcurrency="1" defaultExternal="true" defaultPartitions="1" name="bulkServiceProcessor" start="false">
          <stream external="false" name="bulk/command"/>
          <stream external="false" name="bulk/status"/>
          <stream external="false" name="bulk/done"/>
          <policy continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="0" name="bulk/scroller"/>
          <policy continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="20" name="bulk/status"/>
          <computation concurrency="2" name="bulk/scroller"/>
          <computation concurrency="1" name="bulk/status"/>
        </streamProcessor>
    
        <streamProcessor class="org.nuxeo.ecm.core.bulk.introspection.StreamIntrospectionProcessor" defaultCodec="avro" defaultConcurrency="1" defaultPartitions="1" enabled="true" name="streamIntrospection"/>
    
      </extension>
  • nuxeo-core-bulk-2023.6.12.jar /OSGI-INF/bulk-config.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <!-- SetProperty processor -->
        <streamProcessor class="org.nuxeo.ecm.core.bulk.action.SetPropertiesAction" defaultConcurrency="2" defaultPartitions="4" name="setProperties">
          <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
    
        <!-- SetSystemProperty processor -->
        <streamProcessor class="org.nuxeo.ecm.core.bulk.action.SetSystemPropertiesAction" defaultConcurrency="2" defaultPartitions="4" name="setSystemProperties">
          <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
    
        <!-- RemoveProxy processor -->
        <streamProcessor class="org.nuxeo.ecm.core.bulk.action.RemoveProxyAction" defaultConcurrency="2" defaultPartitions="4" name="removeProxy">
          <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
    
        <!-- Trash processor -->
        <streamProcessor class="org.nuxeo.ecm.core.bulk.action.TrashAction" defaultConcurrency="1" defaultPartitions="1" name="trash">
          <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
    
      </extension>
  • nuxeo-automation-features-2023.6.12.jar /OSGI-INF/bulk-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.ecm.automation.core.operations.services.bulk.AutomationBulkAction" defaultConcurrency="2" defaultPartitions="4" name="automation">
        <policy continueOnFailure="true" delay="1s" maxRetries="3" name="default"/>
        </streamProcessor>
    
        <streamProcessor class="org.nuxeo.ecm.automation.core.operations.services.bulk.AutomationBulkActionUi" defaultConcurrency="2" defaultPartitions="4" name="automationUi">
          <policy continueOnFailure="true" delay="1s" maxRetries="3" name="default"/>
          <option name="failOnError">false</option>
        </streamProcessor>
      </extension>
  • nuxeo-core-storage-2023.6.12.jar /OSGI-INF/bulk-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.ecm.core.storage.action.ExtractBinaryFulltextAction" defaultConcurrency="2" defaultPartitions="4" name="extractBinaryFulltext">
          <policy continueOnFailure="true" delay="1s" maxRetries="3" name="default"/>
        </streamProcessor>
      </extension>
  • nuxeo-elasticsearch-core-2023.6.12.jar /OSGI-INF/bulk-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.elasticsearch.bulk.IndexAction" defaultConcurrency="2" defaultPartitions="4" enabled="true" name="indexAction">
          <policy continueOnFailure="false" delay="1s" maxDelay="60s" maxRetries="20" name="default"/>
          <!-- fetch content and build indexing requests -->
          <computation concurrency="4" name="bulk/index"/>
          <stream name="bulk/index" partitions="12"/>
          <!-- submit requests to elastic -->
          <computation concurrency="2" name="bulk/bulkIndex"/>
          <stream name="bulk/bulkIndex" partitions="8">
            <filter class="org.nuxeo.ecm.core.transientstore.computation.TransientStoreOverflowRecordFilter" name="overflow">
              <option name="storeName">default</option>
              <option name="prefix">index</option>
              <option name="thresholdSize">990000</option>
            </filter>
          </stream>
          <computation concurrency="1" name="bulk/indexCompletion"/>
          <!-- optimal size of the elasticsearch bulk request -->
          <option name="esBulkSizeBytes">5242880</option>
          <!-- max number of actions in the elasticsearch bulk request -->
          <option name="esBulkActions">1000</option>
          <!-- flush elasticsearch bulk request interval -->
          <option name="flushIntervalSeconds">5</option>
        </streamProcessor>
    
      </extension>
  • nuxeo-core-binarymanager-s3-2023.6.12.jar /OSGI-INF/bulk-config.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.ecm.core.bulk.S3SetBlobLengthAction" defaultConcurrency="2" defaultPartitions="4" enabled="false" name="s3SetBlobLength">
          <policy continueOnFailure="true" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
    
      </extension>
  • nuxeo-core-storage-dbs-2023.6.12.jar /OSGI-INF/dbs-bulk-config.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <!-- Update Read ACLs processor -->
        <streamProcessor class="org.nuxeo.ecm.core.storage.dbs.action.UpdateReadAclsAction" defaultConcurrency="1" defaultPartitions="1" name="updateReadAcls">
          <policy continueOnFailure="false" delay="500ms" maxDelay="10s" maxRetries="3" name="default"/>
        </streamProcessor>
    
      </extension>
  • nuxeo-thumbnail-2023.6.12.jar /OSGI-INF/thumbnail-bulk-contrib.xml
    <extension point="streamProcessor" target="org.nuxeo.runtime.stream.service">
        <streamProcessor class="org.nuxeo.ecm.platform.thumbnail.action.RecomputeThumbnailsAction" defaultConcurrency="2" defaultPartitions="6" name="recomputeThumbnails">
          <policy continueOnFailure="true" delay="5s" maxDelay="10s" maxRetries="1" name="default"/>
        </streamProcessor>
      </extension>